package cn.jwis.service.impl;

import cn.jwis.domain.MessageVo;
import cn.jwis.exception.BaseException;
import cn.jwis.mqConfig.MqConnectionConfig;
import cn.jwis.service.interf.MessageService;
import cn.jwis.mqConfig.ConsumerGenerate;
import cn.jwis.mqConfig.CustomizeDynamicConsumerContainer;
import cn.jwis.mqConfig.DynamicConsumer;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;

import java.io.IOException;
import java.util.Map;

/**
 * @author :王泽华
 * @Title: MessageServiceimpl
 * @ProjectName mq-message
 * @Description: TODO
 * @date 2019/8/514:00
 */
@Service
public class MessageServiceimpl implements MessageService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Value("${spring.rabbitmq.exchange}")
    private  String exchange;

    @Autowired
    ConnectionFactory connectionFactory;

    @Autowired
    private RabbitAdmin rabbitAdmin;
    @Autowired
    private CustomizeDynamicConsumerContainer customizeDynamicConsumerContainer;

    @Autowired
    private MqConnectionConfig.AmqpProducer amqpProducer;

    @Override
    public void sendMessage(MessageVo message)throws Exception {

        rabbitTemplate.convertAndSend(message.getQueueName(),message.getData());

    }

    @Override
    public void createTopic(String topicName) throws Exception{
        Connection connection = connectionFactory.createConnection();
        //从连接中获取一个通道
        Channel channel = connection.createChannel(false);
        //exchange有4个类型：direct, topic, headers ,fanout
        channel.exchangeDeclare(exchange, "topic");
        channel.queueDeclare(topicName,true,false,false,null);
        channel.queueBind(topicName, exchange, topicName);
        //关闭通道和连接
        channel.close();
        connection.close();

    }


    @Override
    public void dynamicCreateConsumer(String topicName) throws Exception {
        Map<String, DynamicConsumer> allQueueContainerMap = customizeDynamicConsumerContainer.customizeDynamicConsumerContainer;
        DynamicConsumer consumer = null;
        try {
            //创建消费者
            consumer = ConsumerGenerate
                    .genConsumer(connectionFactory, rabbitAdmin,exchange, topicName, topicName
                            , false, true, true);
        } catch (Exception e) {
            throw  new BaseException("系统异常"+e);
        }
        allQueueContainerMap.put(topicName, consumer);
        //启动消费者
        consumer.start();
        //发送消息到交换机
        amqpProducer.publishMsg(exchange, topicName, "Hello MQ!");
    }

    //暂停消费者
    @Override
    public void stopConsumer(String topicName) throws Exception {
        Map<String, DynamicConsumer> allQueueContainerMap = customizeDynamicConsumerContainer.customizeDynamicConsumerContainer;
        DynamicConsumer dynamicConsumer = allQueueContainerMap.get(topicName);
        dynamicConsumer.stop();
    }

    @Override
    public void sendFileMessage(String queueName, MultipartFile file) {
        byte[] bytes = new byte[0];
        try {
            bytes = file.getBytes();
        } catch (IOException e) {
            e.printStackTrace();
        }
        rabbitTemplate.convertAndSend(exchange,queueName,bytes);
    }
}
